#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h> 
#include <string.h>

#include "msg.h"
#include "custom.h"

typedef struct
{
	MsgNode msg_q_;
	pthread_mutex_t msg_mutex_;
	pthread_cond_t msg_cv_;

	int msg_pipe_[2];
}MsgQ;

static MsgQ g_snd_q;
static MsgQ g_rcv_q;

static void *GetMsgFromSndQ(void *arg);
static void *GetMsgFromRcvQ(void *arg);

static void HandleMsg(char *msg);

//--------------------send---------------------------------
void InitSendQueue()
{
	QueueCreat(&g_snd_q.msg_q_.list_);
	pthread_mutex_init(&g_snd_q.msg_mutex_, NULL);
	pthread_cond_init(&g_snd_q.msg_cv_, NULL);

	pthread_t tid;
	int ret = pthread_create(&tid, NULL, GetMsgFromSndQ, NULL);
	if (ret < 0)
	{
		perror("pthread_create");
		exit(-1);
	}

	if(pipe(g_snd_q.msg_pipe_) < 0)
	{
		perror("pipe");
		exit(-1);
	}
}

void PushBack2SndQueue(MsgNode *msg)
{
	pthread_mutex_lock(&g_snd_q.msg_mutex_);
	
	Enqueue(&msg->list_, &g_snd_q.msg_q_.list_);
	
	pthread_cond_signal(&g_snd_q.msg_cv_);
	pthread_mutex_unlock(&g_snd_q.msg_mutex_);
}

static void *GetMsgFromSndQ(void *arg)
{
	MsgNode * first = NULL;

	while(1)
	{
		pthread_mutex_lock(&g_snd_q.msg_mutex_);

		while(QueueSize(&g_snd_q.msg_q_.list_) == 0)
			pthread_cond_wait(&g_snd_q.msg_cv_, &g_snd_q.msg_mutex_);

		first = GET_QUEUE_HEAD(MsgNode, &g_snd_q.msg_q_.list_, list_);
		Dequeue(&g_snd_q.msg_q_.list_);

		pthread_mutex_unlock(&g_snd_q.msg_mutex_);

		char *msg = first->cmd_.msg_;
		int seq = first->cmd_.seq_;

		printf("GetMsgFromSndQ, seq:%d, msg:%s\n", seq, msg);

		Command cmd;
		cmd.seq_ = seq;
		cmd.msg_ = strdup(first->cmd_.msg_);

		write(g_snd_q.msg_pipe_[1], (void *)&cmd, sizeof(cmd));

		free(first->cmd_.msg_);
		free(first);
	}

	return NULL;
}

int GetSndPipeFd()
{
	return g_snd_q.msg_pipe_[0];
}

//--------------------recv---------------------------------
void InitRecvQueue()
{
	QueueCreat(&g_rcv_q.msg_q_.list_);
	pthread_mutex_init(&g_rcv_q.msg_mutex_, NULL);
	pthread_cond_init(&g_rcv_q.msg_cv_, NULL);

	pthread_t tid;
	int ret = pthread_create(&tid, NULL, GetMsgFromRcvQ, NULL);
	if (ret < 0)
	{
		perror("pthread_create");
		exit(-1);
	}
}

void PushBack2RcvQueue(MsgNode *msg)
{
	pthread_mutex_lock(&g_rcv_q.msg_mutex_);

	printf("PushBack2RcvQueue, msg: %s\n", msg->cmd_.msg_);

	Enqueue(&msg->list_, &g_rcv_q.msg_q_.list_);

	pthread_cond_signal(&g_rcv_q.msg_cv_);

	pthread_mutex_unlock(&g_rcv_q.msg_mutex_);
}

static void *GetMsgFromRcvQ(void * arg)
{
	MsgNode * first = NULL;

	while(1)
	{
		pthread_mutex_lock(&g_rcv_q.msg_mutex_);

		printf("GetMsgFromRcvQ, waiting for msg----------\n");

		while(QueueSize(&g_rcv_q.msg_q_.list_) == 0)
			pthread_cond_wait(&g_rcv_q.msg_cv_, &g_rcv_q.msg_mutex_);

		first = GET_QUEUE_HEAD(MsgNode, &g_rcv_q.msg_q_.list_, list_);
		Dequeue(&g_rcv_q.msg_q_.list_);

		pthread_mutex_unlock(&g_rcv_q.msg_mutex_);

		char *msg = first->cmd_.msg_;
		int seq = first->cmd_.seq_;

		printf("GetMsgFromRcvQ, seq:%d, msg:%s\n", seq, msg);

		//Handle it
		HandleMsg(msg);

		free(first->cmd_.msg_);
		free(first);
	}

	return NULL;
}

static void HandleMsg(char *msg)
{
	char data[1024] = {0};
	char id[64] = {0};
	
	event_type_e event = ParseEvent(msg, data, sizeof(data), id, sizeof(id));
	
	switch(event)
	{
		case EVENT_TYPE_CONN_REQ:
			HandleConnReq(data, id);
			break;
		case EVENT_TYPE_CONN_RESP:
			HandleConnResp(data);
		case EVENT_TYPE_LIST_DEVICE:
			HandleDevList(data);
			break;
		default:
			break;
	}
}

